1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.channel.pipeline; 12 13 import std.typecons; 14 import std.variant; 15 import std.functional; 16 import std.range.primitives; 17 18 import collie.channel.handler; 19 import collie.channel.handlercontext; 20 import collie.channel.exception; 21 import collie.net; 22 import kiss.event; 23 24 interface PipelineManager 25 { 26 void deletePipeline(PipelineBase pipeline); 27 void refreshTimeout(); 28 } 29 30 abstract class PipelineBase 31 { 32 this() 33 { 34 } 35 36 ~this() 37 { 38 } 39 40 pragma(inline) 41 @property final void pipelineManager(PipelineManager manager) 42 { 43 _manager = manager; 44 } 45 46 pragma(inline,true) 47 @property final PipelineManager pipelineManager() 48 { 49 return _manager; 50 } 51 52 pragma(inline) 53 final void deletePipeline() 54 { 55 if (_manager) 56 { 57 _manager.deletePipeline(this); 58 } 59 } 60 61 pragma(inline) 62 @property final void transport(Channel transport) 63 { 64 _transport = transport; 65 } 66 67 pragma(inline,true) 68 @property final Channel transport() 69 { 70 return _transport; 71 } 72 73 pragma(inline) 74 final PipelineBase addBack(H)(H handler) 75 { 76 return addHelper(new ContextType!(H)(this, handler), false); 77 } 78 79 pragma(inline) 80 final PipelineBase addFront(H)(H handler) 81 { 82 return addHelper(new ContextType!(H)(this, handler), true); 83 } 84 85 pragma(inline) 86 final PipelineBase remove(H)(H handler) 87 { 88 return removeHelper!H(handler, true); 89 } 90 91 pragma(inline) 92 final PipelineBase remove(H)() 93 { 94 return removeHelper!H(null, false); 95 } 96 97 pragma(inline) 98 final PipelineBase removeFront() 99 { 100 if (_ctxs.empty()) 101 { 102 throw new PipelineEmptyException("No handlers in pipeline"); 103 } 104 removeAt(0); 105 return this; 106 } 107 108 pragma(inline) 109 final PipelineBase removeBack() 110 { 111 if (_ctxs.empty()) 112 { 113 throw new PipelineEmptyException("No handlers in pipeline"); 114 } 115 removeAt(_ctxs.length - 1); 116 return this; 117 } 118 119 pragma(inline) 120 final auto getHandler(H)(int i) 121 { 122 getContext!H(i).handler; 123 } 124 125 final auto getHandler(H)() 126 { 127 auto ctx = getContext!H(); 128 if (ctx) 129 return ctx.handler; 130 return null; 131 } 132 133 pragma(inline) 134 auto getContext(H)(int i) 135 { 136 auto ctx = cast(ContextType!H)(_ctxs[i]); 137 assert(ctx); 138 return ctx; 139 } 140 141 auto getContext(H)() 142 { 143 foreach (i; 0 .. _ctxs.length) 144 { 145 auto tctx = _ctxs.at(i); 146 auto ctx = cast(ContextType!H)(tctx); 147 if (ctx) 148 return ctx; 149 } 150 return null; 151 } 152 153 void finalize(); 154 155 final void detachHandlers() 156 { 157 foreach (i; 0 .. _ctxs.length) 158 { 159 auto ctx = _ctxs[i]; 160 ctx.detachPipeline(); 161 } 162 } 163 164 protected: 165 PipelineContext[] _ctxs; 166 PipelineContext[] _inCtxs; 167 PipelineContext[] _outCtxs; 168 169 bool _isFinalize = true; 170 private: 171 PipelineManager _manager = null; 172 Channel _transport; 173 // AsynTransportlogInfo _transportlogInfo; 174 175 final PipelineBase addHelper(Context)(Context ctx, bool front) 176 { 177 PipelineContext[] addBefore(PipelineContext[] ctxs, Context ctx){ 178 auto tctxs = new PipelineContext[ctxs.length + 1]; 179 tctxs[0] = ctx; 180 tctxs[1..$] = ctxs[0..$]; 181 return tctxs; 182 } 183 184 PipelineContext[] insertBack(PipelineContext[] ctxs, Context ctx){ 185 auto tctxs = new PipelineContext[ctxs.length + 1]; 186 tctxs[0..ctxs.length] = ctxs[0..$]; 187 tctxs[$ - 1] = ctx; 188 return tctxs; 189 } 190 191 _isFinalize = false; 192 _ctxs = front ? addBefore(_ctxs, ctx) : insertBack(_ctxs, ctx); 193 if (Context.dir == HandlerDir.BOTH || Context.dir == HandlerDir.IN) 194 { 195 _inCtxs = front ? addBefore(_inCtxs, ctx) : insertBack(_inCtxs, ctx); 196 } 197 if (Context.dir == HandlerDir.BOTH || Context.dir == HandlerDir.OUT) 198 { 199 _outCtxs = front ? addBefore(_outCtxs, ctx) : insertBack(_outCtxs, ctx); 200 } 201 return this; 202 } 203 204 final PipelineBase removeHelper(H)(H handler, bool checkEqual) 205 { 206 bool removed = false; 207 208 for (size_t i = 0; i < _ctxs.length; ++i) 209 { 210 auto ctx = cast(ContextType!H) _ctxs[i]; 211 if (ctx && (!checkEqual || ctx.getHandler() == handler)) 212 { 213 removeAt(i); 214 removed = true; 215 --i; 216 break; 217 } 218 } 219 if (!removed) 220 { 221 throw new HandlerNotInPipelineException("No such handler in pipeline"); 222 } 223 224 return *this; 225 } 226 227 final void removeAt(size_t site) 228 { 229 import kiss.container.array; 230 _isFinalize = false; 231 PipelineContext rctx = _ctxs[site]; 232 rctx.detachPipeline(); 233 removeSite(_ctxs,site); 234 //_ctxs.removeSite(site); 235 236 import std.algorithm.searching; 237 238 const auto dir = rctx.getDirection(); 239 if (dir == HandlerDir.BOTH || dir == HandlerDir.IN) 240 { 241 arrayRemove(_inCtxs,rctx,true); 242 // _inCtxs.removeOne(rctx); 243 } 244 245 if (dir == HandlerDir.BOTH || dir == HandlerDir.OUT) 246 { 247 arrayRemove(_inCtxs,rctx,true); 248 //_outCtxs.removeOne(rctx); 249 } 250 } 251 } 252 253 /* 254 * R is the inbound type, i.e. inbound calls start with pipeline.read(R) 255 * W is the outbound type, i.e. outbound calls start with pipeline.write(W) 256 * 257 * Use Unit for one of the types if your pipeline is unidirectional. 258 * If R is void, read(), will be disabled. 259 * If W is Unit, write() and close() will be disabled. 260 */ 261 262 final class Pipeline(R, W = void) : PipelineBase 263 { 264 alias Ptr = Pipeline!(R, W); 265 266 static Ptr create() 267 { 268 return new Ptr(); 269 } 270 271 ~this() 272 { 273 // if (!_isStatic) // USE GC, maybe the contex will free before pipeline 274 // { 275 // detachHandlers(); 276 // } 277 } 278 279 pragma(inline) 280 void read(R msg) 281 { 282 static if (!is(R == void)) 283 { 284 if (_front) 285 _front.read(msg); 286 else 287 throw new NotHasInBoundException("read(): not have inbound handler in Pipeline"); 288 } 289 } 290 291 pragma(inline,true) 292 void timeOut() 293 { 294 static if (!is(R == void)) 295 { 296 if (_front) 297 _front.timeOut(); 298 else 299 throw new NotHasInBoundException("timeOut(): not have inbound handler in Pipeline"); 300 } 301 } 302 303 pragma(inline) 304 void transportActive() 305 { 306 static if (!is(R == void)) 307 { 308 if (_front) 309 { 310 _front.transportActive(); 311 } 312 } 313 } 314 315 pragma(inline) 316 void transportInactive() 317 { 318 static if (!is(R == void)) 319 { 320 if (_front) 321 { 322 _front.transportInactive(); 323 } 324 } 325 } 326 327 static if (!is(W == void)) 328 { 329 alias TheCallBack = void delegate(W, size_t); 330 pragma(inline) 331 void write(W msg, TheCallBack cback = null) 332 { 333 334 if (_back) 335 _back.write(msg, cback); 336 else 337 throw new NotHasOutBoundException("close(): no outbound handler in Pipeline"); 338 } 339 } 340 341 pragma(inline) 342 void close() 343 { 344 static if (!is(W == void)) 345 { 346 if (_back) 347 _back.close(); 348 else 349 throw new NotHasOutBoundException("close(): no outbound handler in Pipeline"); 350 } 351 } 352 353 override void finalize() 354 { 355 if (_isFinalize) 356 return; 357 _front = null; 358 static if (!is(R == void)) 359 { 360 if (!_inCtxs.empty()) 361 { 362 _front = cast(InboundLink!R)(_inCtxs[0]); 363 for (size_t i = 0; i < _inCtxs.length - 1; i++) 364 { 365 _inCtxs[i].setNextIn(_inCtxs[i + 1]); 366 } 367 _inCtxs[_inCtxs.length - 1].setNextIn(null); 368 } 369 } 370 371 _back = null; 372 static if (!is(W == void)) 373 { 374 375 if (!_outCtxs.empty()) 376 { 377 _back = cast(OutboundLink!W)(_outCtxs[_outCtxs.length - 1]); 378 for (size_t i = _outCtxs.length - 1; i > 0; --i) 379 { 380 _outCtxs[i].setNextOut(_outCtxs[i - 1]); 381 } 382 _outCtxs[0].setNextOut(null); 383 } 384 } 385 386 for (int i = 0; i < _ctxs.length; ++i) 387 { 388 _ctxs[i].attachPipeline(); 389 } 390 391 if (_front is null && _back is null) 392 throw new PipelineEmptyException("No Handler in the Pipeline"); 393 394 _isFinalize = true; 395 } 396 397 protected: 398 this() 399 { 400 super(); 401 } 402 403 this(bool isStatic) 404 { 405 _isStatic = isStatic; 406 super(); 407 } 408 409 private: 410 bool _isStatic = false; 411 412 static if (!is(R == void)) 413 { 414 InboundLink!R _front = null; 415 } 416 else 417 { 418 Object _front = null; 419 } 420 421 static if (!is(W == void)) 422 { 423 OutboundLink!W _back = null; 424 } 425 else 426 { 427 Object _back = null; 428 } 429 } 430 431 abstract shared class PipelineFactory(PipeLine) 432 { 433 PipeLine newPipeline(TcpStream transport); 434 } 435 436 alias AcceptPipeline = Pipeline!(TcpStream, uint); 437 abstract shared class AcceptPipelineFactory 438 { 439 AcceptPipeline newPipeline(TcpListener acceptor); 440 }